{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "b49e6ba4-56d4-4019-9409-9d0fa6d33823",
     "showTitle": false,
     "title": ""
    }
   },
   "source": [
    "# SparkStreaming WordCount\n",
    "\n",
    "在这里，我们监控一个指定的路径，用SparkStreaming读取路径下的文件，并统计单词个数。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "beb9ec6a-f04b-4e68-b460-6ad14ae99c62",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "from pyspark.streaming import StreamingContext\n",
    "\n",
    "# 通过SparkContext对象，构建StreamingContext对象，每5秒钟执行一次\n",
    "ssc = StreamingContext(sc, 5)\n",
    "\n",
    "# SparkContext对象，SparkCore的入口\n",
    "# SparkSession对象，SparkSQL的入口\n",
    "# StreamingContext对象，Spark Streaming的入口\n",
    "ssc"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "763e8383-a4c7-4735-ae44-ce601967179c",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "# 监听指定路径下的文件，读取一个时间窗口内发生变化的（新增、修改）文件进行处理\n",
    "lines = ssc.textFileStream(\"/mnt/databrickscontainer1/SparkStreaming\")\n",
    "\n",
    "# 按行读取文件、用空格拆分单词\n",
    "words = lines.flatMap(lambda x: x.split(\" \"))\n",
    "# 为每个单词计数为1\n",
    "pairs = words.map(lambda x: (x, 1))\n",
    "# 根据单词汇总单词的个数\n",
    "counts = pairs.reduceByKey(lambda a,b: a + b)\n",
    "# 打印最终的结果，得到WordCount\n",
    "counts.pprint()\n",
    "\n",
    "# lines.flatMap(lambda x: x.split(\" \")).map(lambda x: (x, 1)).reduceByKey(lambda a,b: a + b).pprint()\n",
    "\n",
    "# 启动流处理\n",
    "ssc.start()\n",
    "# 阻塞当前线程，让Streaming程序一直执行\n",
    "ssc.awaitTermination()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "ea9a66b4-d9b6-4e06-8d7d-ea4873a9b72f",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "# 启动流处理\n",
    "ssc.start()\n",
    "# 阻塞当前线程，让Streaming程序一直执行\n",
    "ssc.awaitTermination()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "application/vnd.databricks.v1+cell": {
     "inputWidgets": {},
     "nuid": "f88df1dd-73de-4c85-9d1f-45e3899a0717",
     "showTitle": false,
     "title": ""
    }
   },
   "outputs": [],
   "source": [
    "# 停止流处理程序\n",
    "ssc.stop()"
   ]
  }
 ],
 "metadata": {
  "application/vnd.databricks.v1+notebook": {
   "dashboards": [],
   "language": "python",
   "notebookMetadata": {
    "pythonIndentUnit": 4
   },
   "notebookName": "SparkStreaming WordCount",
   "notebookOrigID": 2180139033493832,
   "widgets": {}
  },
  "kernelspec": {
   "display_name": "",
   "name": ""
  },
  "language_info": {
   "name": ""
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
